﻿using MySql.Data.MySqlClient;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Common;
using SysLog;
using Dapper;

namespace DataBase
{
    public class DB : IDisposable
    {
        public static ConcurrentQueue<string> SqlQueue1 = new ConcurrentQueue<string>();
        public static ConcurrentQueue<string> SqlQueue2 = new ConcurrentQueue<string>();
        public static ConcurrentQueue<string> SqlQueue3 = new ConcurrentQueue<string>();

        private static Lazy<DB> _instance = new Lazy<DB>(() => new DB());
        public static DB Intance => new DB(); //_instance.Value;

        private static readonly string dbType = AppConfigurtaionServices.Configuration["AppConfig:Db:DbType:db1"];
        private static readonly string dbType2 = AppConfigurtaionServices.Configuration["AppConfig:Db:DbType:db2"];
        private static readonly string dbType3 = AppConfigurtaionServices.Configuration["AppConfig:Db:DbType:db3"];

        private static readonly string dbconstr = AppConfigurtaionServices.Configuration["AppConfig:Db:DbConnStr:db1"];
        private static readonly string dbconstr2 = AppConfigurtaionServices.Configuration["AppConfig:Db:DbConnStr:db2"];
        private static readonly string dbconstr3 = AppConfigurtaionServices.Configuration["AppConfig:Db:DbConnStr:db3"];

        private static readonly int dbThreadCount = Convert.ToInt32(AppConfigurtaionServices.Configuration["AppConfig:Db:Thread:DbExecuteThreadCount"]);
        private static readonly int batchMsgSqlCount = Convert.ToInt32(AppConfigurtaionServices.Configuration["AppConfig:Db:Thread:BatchMsgSqlCount"]);

        //IDbConnection db1, db2, db3;                //数据库连接
        string selid1 = "", selid2 = "", selid3 = "";     //id查询字符串

        public string DbType { get; set; }


        private StringBuilder _db1Sqls = new StringBuilder();

        private StringBuilder _bulkAddOriginalDataSql = new StringBuilder();

        public StringBuilder _db2Sqls = new StringBuilder();

        public StringBuilder _db3Sqls = new StringBuilder();

        private static readonly Random _random = new Random();


        public DB()          //数据配置选择
        {
            DbType = dbType;
            if (dbType == "SqlServer")
            {
                if (dbconstr != "")
                {
                    //db1 = new SqlConnection(dbconstr);
                    selid1 = ";select SCOPE_IDENTITY();";
                }
            }
            else if (dbType == "MySql")
            {
                if (dbconstr != "")
                {
                    //db1 = new MySqlConnection(dbconstr);
                    selid1 = ";select LAST_INSERT_ID();";
                }
            }

            if (dbType == "SqlServer")
            {
                if (dbconstr2 != "")
                {
                    //db2 = new SqlConnection(dbconstr2);
                    selid2 = ";select SCOPE_IDENTITY();";
                }
            }
            else if (dbType == "MySql")
            {
                if (dbconstr2 != "")
                {
                    //db2 = new MySqlConnection(dbconstr2);
                    selid2 = ";select LAST_INSERT_ID();";
                }
            }

            if (dbType == "SqlServer")
            {
                if (dbconstr3 != "")
                {
                    //db3 = new SqlConnection(dbconstr3);
                    selid3 = ";select SCOPE_IDENTITY();";
                }
            }
            else if (dbType == "MySql")
            {
                if (dbconstr3 != "")
                {
                    //db3 = new MySqlConnection(dbconstr3);
                    selid3 = ";select LAST_INSERT_ID();";
                }
            }
        }

        #region SQL Server数据库操作

        private IDbConnection GetConnection(string mdbconstr)
        {
            IDbConnection db = null;
            if (dbType == "SqlServer")
            {
                if (mdbconstr != "")
                {
                    db = new SqlConnection(mdbconstr);
                }
            }
            else if (dbType == "MySql")
            {
                if (dbconstr != "")
                {
                    db = new MySqlConnection(mdbconstr);
                }
            }
            return db;
        }

        public DataTable DataWorke(string cmdstr)
        {
            DataTable dt = new DataTable();
            try
            {

                using (var db1 = this.GetConnection(dbconstr))
                using (var idt = db1.ExecuteReader(cmdstr))
                {
                    dt.Load(idt);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            return dt;
        }
        public DataTable DataWorke2(string cmdstr)
        {
            if (selid2 == "") return null;
            DataTable dt = new DataTable();
            try
            {
                using (var db2 = this.GetConnection(dbconstr2))
                using (var idt = db2.ExecuteReader(cmdstr))
                {
                    dt.Load(idt);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Database Error;" + e.ToString() + "\n数据：cmdstr" + cmdstr);
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            return dt;
        }
        public DataTable DataWorke3(string cmdstr)
        {
            if (selid3 == "") return null;
            DataTable dt = new DataTable();
            try
            {
                using (var db3 = this.GetConnection(dbconstr3))
                using (var idt = db3.ExecuteReader(cmdstr))
                {
                    dt.Load(idt);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Database Error;" + e.ToString() + "\n数据：cmdstr" + cmdstr);
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            return dt;
        }

        public int SqlExecute(string cmdstr, bool isAddOriginalDataSql = false)
        {
            int i = 0;
            try
            {
                if (isAddOriginalDataSql)
                    _bulkAddOriginalDataSql.Append($"{cmdstr}");
                else
                    _db1Sqls.Append($"{cmdstr};");
                //i = db1.Execute(cmdstr);
            }
            catch (Exception e)
            {
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            return i;
        }
        public int SqlExecute2(string cmdstr)
        {
            //if (selid2 == "") return 0;
            int i = 0;
            try
            {
                _db2Sqls.Append($"{cmdstr};");
                //i = db2.Execute(cmdstr);
            }
            catch (Exception e)
            {
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            return i;
        }
        public int SqlExecute3(string cmdstr)
        {
            if (selid3 == "") return 0;
            int i = 0;
            try
            {
                _db3Sqls.Append($"{cmdstr};");
                //i = db3.Execute(cmdstr);
            }
            catch (Exception e)
            {
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            return i;
        }

        public void ClusterEqueue()
        {
            var strSql1 = _db1Sqls.ToString();
            var strSql2 = _db2Sqls.ToString();
            var strSql3 = _db3Sqls.ToString();



            if (string.IsNullOrWhiteSpace(strSql1) || strSql1 == ";") return;

            strSql1 += _bulkAddOriginalDataSql.ToString() + ";";

            // sql1插入事务，减少事务开销
            //var rid = _random.Next(int.MaxValue);
            //if (dbType == "SqlServer")
            //{
            //    strSql1 = $"BEGIN TRAN " +
            //               $"DECLARE @TRAN_ERROR{rid} INT; " +
            //               $"SET @TRAN_ERROR{rid} = 0; " +
            //               $"BEGIN TRY " +
            //               $"   {strSql1} " +
            //               @"END TRY
            //                    BEGIN CATCH
            //                      SET @TRAN_ERROR" + rid + " = @TRAN_ERROR" + rid + @" + 1
            //                    END CATCH
            //                IF(@TRAN_ERROR" + rid + @" > 0)
            //                    BEGIN
            //                        ROLLBACK TRAN;
            //                    END
            //                ELSE
            //                    BEGIN
            //                        COMMIT TRAN;
            //                    END";
            //}
            //else if (dbType == "MySql")
            //{
            //    strSql1 = @"START TRANSACTION;  " +
            //              $"    {strSql1}   " +
            //              @"COMMIT; ";
            //}

            try
            {
                if (!string.IsNullOrWhiteSpace(dbconstr) && !string.IsNullOrWhiteSpace(strSql1)) SqlQueue1.Enqueue(strSql1);  // if (db1 != null && !string.IsNullOrWhiteSpace(strSql1))  db1.Execute(strSql1, commandTimeout: 60 * 1000);

                if (!string.IsNullOrWhiteSpace(dbconstr2) && !string.IsNullOrWhiteSpace(strSql2)) SqlQueue2.Enqueue(strSql2); // db2.Execute(strSql2, commandTimeout: 60 * 1000);

                if (!string.IsNullOrWhiteSpace(dbconstr3) && !string.IsNullOrWhiteSpace(strSql3)) SqlQueue3.Enqueue(strSql3); // db3.Execute(strSql3, commandTimeout: 60 * 1000);
            }
            catch (Exception e)
            {
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：strSql1:" + strSql1 + "\nstrSql2" + strSql2 +
                                 "\nstrSql3" + strSql3);
            }
            finally
            {
                _db1Sqls = null;
                _db2Sqls = null;
                _db3Sqls = null;
            }
        }

        //private static volatile int _gcClearIntervalCount;

        public static void ExecuteQueue1()
        {
            List<Task> parallelList = new List<Task>(); // 批量线程
            List<string> batchMsgSqls = new List<string>(); // 每个线程同时处理报文数


            while (true)
            {
                string strSb = null;
                if (!SqlQueue1.TryDequeue(out strSb))
                {
                    if (batchMsgSqls.Count > 0)
                    {
                        string[] sqlArr = new string[batchMsgSqls.Count];
                        batchMsgSqls.CopyTo(sqlArr);
                        batchMsgSqls.Clear();

                        parallelList.Add(
                            Task.Run(() =>
                            {
                                using (IDbConnection db = (dbType == "SqlServer")
                                    ? (IDbConnection)new SqlConnection(dbconstr)
                                    : (IDbConnection)new MySqlConnection(dbconstr))
                                {
                                    //var sql = String.Join(";", sqlArr); // 如果这样会入库不完整
                                    foreach (var sql in sqlArr)
                                    {
                                        db.Execute(sql);
                                    }
                                }
                            }));
                    }

                    if (parallelList.Count > 0)
                    {
                        Task[] taskArr = new Task[parallelList.Count];
                        parallelList.CopyTo(taskArr);
                        parallelList.Clear();
                        Task.WaitAny(taskArr);
                    }
                    else
                    {
                        Thread.Sleep(50);
                    }
                    continue;
                }

                //if (string.IsNullOrWhiteSpace(strSb.Value)) continue;

                try
                {
                    batchMsgSqls.Add(strSb);

                    if (batchMsgSqls.Count == batchMsgSqlCount)
                    {
                        string[] sqlArr = new string[batchMsgSqls.Count];
                        batchMsgSqls.CopyTo(sqlArr);
                        batchMsgSqls.Clear();
                        parallelList.Add(
                            Task.Run(() =>
                            {
                                using (IDbConnection db = (dbType == "SqlServer")
                                    ? (IDbConnection)new SqlConnection(dbconstr)
                                    : (IDbConnection)new MySqlConnection(dbconstr))
                                {
                                    //var sql = String.Join(";", sqlArr); // 如果这样会入库不完整
                                    foreach (var sql in sqlArr)
                                    {
                                        db.Execute(sql);
                                    }
                                }
                            }));
                    }

                    if (parallelList.Count == dbThreadCount)
                    {
                        Task[] taskArr = new Task[parallelList.Count];
                        parallelList.CopyTo(taskArr);
                        parallelList.Clear();
                        Task.WaitAny(taskArr);
                    }
                }
                catch (Exception e)
                {
                    WriteException we = new WriteException();
                    we.SavaException("Database Error;" + e.ToString() + " \n " + strSb);
                }

                // 垃圾回收
                //Interlocked.Increment(ref _gcClearIntervalCount);
                //if (30 == _gcClearIntervalCount)
                //{
                //    Interlocked.Exchange(ref _gcClearIntervalCount, 0);
                //    GC.Collect();
                //}
            }
        }

        public static void ExecuteQueue2()
        {
            if (dbconstr2 == "")
            {
                SqlQueue2 = null; return;
            }

            List<Task> parallelList = new List<Task>();

            while (true)
            {
                if (SqlQueue2.IsEmpty)
                {
                    if (parallelList.Count > 0)
                    {
                        Task.WaitAll(parallelList.ToArray());
                        parallelList.Clear();
                    }
                    Thread.Sleep(50);
                    continue;
                }

                string strSb2 = null;

                if (!SqlQueue2.TryDequeue(out strSb2) || string.IsNullOrWhiteSpace(strSb2))
                {
                    continue;
                }

                try
                {
                    parallelList.Add(Task.Factory.StartNew(() =>
                    {
                        using (IDbConnection db = (dbType == "SqlServer")
                            ? (IDbConnection)new SqlConnection(dbconstr2)
                            : (IDbConnection)new MySqlConnection(dbconstr2))
                        {
                            db.Execute(strSb2);
                            strSb2 = null;
                        }
                    }));

                    if (parallelList.Count == dbThreadCount)
                    {
                        Task.WaitAll(parallelList.ToArray());
                        parallelList.Clear();
                    }
                }
                catch (Exception e)
                {
                    WriteException we = new WriteException();
                    we.SavaException("Database Error;" + e.ToString() + " \n " + strSb2);
                }
            }
        }

        public static void ExecuteQueue3()
        {
            if (dbconstr3 == "")
            {
                SqlQueue3 = null;
                return;
            }

            List<Task> parallelList = new List<Task>();

            while (true)
            {
                if (SqlQueue3.IsEmpty)
                {
                    if (parallelList.Count > 0)
                    {
                        Task.WaitAll(parallelList.ToArray());
                        parallelList.Clear();
                    }
                    Thread.Sleep(50);
                    continue;
                }

                string strSb3 = null;

                if (!SqlQueue3.TryDequeue(out strSb3) || string.IsNullOrWhiteSpace(strSb3))
                {
                    Thread.Sleep(10);
                    continue;
                }

                try
                {
                    parallelList.Add(Task.Factory.StartNew(() =>
                    {
                        using (IDbConnection db = (dbType == "SqlServer")
                            ? (IDbConnection)new SqlConnection(dbconstr3)
                            : (IDbConnection)new MySqlConnection(dbconstr3))
                        {
                            db.Execute(strSb3);
                            strSb3 = null;
                        }
                    }));

                    if (parallelList.Count == dbThreadCount)
                    {
                        Task.WaitAll(parallelList.ToArray());
                        parallelList.Clear();
                    }
                }
                catch (Exception e)
                {
                    WriteException we = new WriteException();
                    we.SavaException("Database Error;" + e.ToString() + " \n " + strSb3);
                }

            }
        }

        public int SqlExecuteScalar(string cmdstr)
        {
            if (selid1 == "") return 0;
            int i = 0;
            DataTable dt = new DataTable();

            cmdstr += selid1;
            try
            {
                using (var db1 = this.GetConnection(dbconstr))
                    dt.Load(db1.ExecuteReader(cmdstr));
            }
            catch (Exception e)
            {
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            if (dt != null && dt.Rows.Count > 0) i = Convert.ToInt32(dt.Rows[0][0]);
            return i;
        }

        public int SqlExecuteScalar2(string cmdstr)
        {
            if (selid2 == "") return 0;
            int i = 0;
            DataTable dt = new DataTable();

            cmdstr += selid2;
            try
            {
                using (var db2 = this.GetConnection(dbconstr2))
                    dt.Load(db2.ExecuteReader(cmdstr));
            }
            catch (Exception e)
            {
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            if (dt != null && dt.Rows.Count > 0) i = Convert.ToInt32(dt.Rows[0][0]);
            return i;
        }

        public int SqlExecuteScalar3(string cmdstr)
        {
            if (selid3 == "") return 0;
            int i = 0;
            DataTable dt = new DataTable();

            cmdstr += selid3;
            try
            {
                using (var db3 = this.GetConnection(dbconstr3))
                    dt.Load(db3.ExecuteReader(cmdstr));
            }
            catch (Exception e)
            {
                WriteException we = new WriteException();
                we.SavaException("Database Error;" + e.ToString() + "\n数据：cmdstr:" + cmdstr);
            }
            if (dt != null && dt.Rows.Count > 0) i = Convert.ToInt32(dt.Rows[0][0]);
            return i;
        }

        #region IDisposable Support
        private bool disposedValue = false; // 要检测冗余调用

        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    _db1Sqls = null;
                    _db2Sqls = null;
                    _db3Sqls = null;
                }

                disposedValue = true;
            }
        }

        ~DB()
        {
            // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
            Dispose(false);
        }

        // 添加此代码以正确实现可处置模式。
        public void Dispose()
        {
            // 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        #endregion
        #endregion
    }
}
